package com.atguigu.realtime.utils

import com.atguigu.common.utils.PropertiesUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

/**
 * Created by Smexy on 2022/6/25
 */
object DStreamUtil {



  def createDS(streamingContext:StreamingContext,groupId:String,topic:String,
               ifSaveToMysql:Boolean = false, offsets: Map[TopicPartition, Long] = null):InputDStream[ConsumerRecord[String, String]]={

    // 封装kafka的消费者参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> PropertiesUtil.getProperty("kafka.broker.list"),
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> "false"
    )

    //判断是否是事务输出
    if (ifSaveToMysql){

      val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
        streamingContext,
        PreferConsistent,
        Subscribe[String, String](Array(topic), kafkaParams,offsets)
      )

      ds

    }else{
      val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
        streamingContext,
        PreferConsistent,
        Subscribe[String, String](Array(topic), kafkaParams)
      )

      ds

    }

  }

}
